Kafka环境需要先安装java和zookeeper,步骤比较繁琐,使用docker就可以简化很多。
安装docker
|
|
测试是否安装成功:
sudo docker run hello-world
在Docker环境下部署Kafka
pull镜像
docker pull wurstmeister/zookeeper
docker pull wurstmeister/kafka
开启服务
docker run -d –name zookeeper -p 2181:2181 -t wurstmeister/zookeeper
docker run -d –name kafka –publish 9092:9092 –link zookeeper –env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 –env KAFKA_ADVERTISED_HOST_NAME=192.168.105.223 –env KAFKA_ADVERTISED_PORT=9092 –volume /etc/localtime:/etc/localtime wurstmeister/kafka:latest
以上步骤就可以启动Kafka了,然后执行Docker ps,找到kafka的Container ID:
sudo docker exec -it xxx(Container ID) /bin/bash 进入容器内部
cd /opt/kafka_2.12-2.4.1/ 进入kafka默认目录
kafka-topics.sh –create –zookeeper zookeeper:2181 –replication-factor 1 –partitions 1 –topic mykafka 创建一个主题
生产者消费者python测试
安装python包:
apt install python-kafka python3-kafka
生产者代码:
因为我使用了python3,如果不加上.encode(‘utf-8’)就会报错,但如果使用python2就不会报错
|
|
消费者代码:
|
|
然后开两个终端,一个运行消费者代码,一个运行生产者代码。
消费者:
ConsumerRecord(topic=’mykafka’, partition=0, offset=154, timestamp=1587009112770, timestamp_type=0, key=None, value=b’{“top”:97}’, checksum=-792520670, serialized_key_size=-1, serialized_value_size=10)
ConsumerRecord(topic=’mykafka’, partition=0, offset=155, timestamp=1587009113271, timestamp_type=0, key=None, value=b’{“top”:98}’, checksum=-1791183436, serialized_key_size=-1, serialized_value_size=10)
ConsumerRecord(topic=’mykafka’, partition=0, offset=156, timestamp=1587009113772, timestamp_type=0, key=None, value=b’{“top”:99}’, checksum=-10726817, serialized_key_size=-1, serialized_value_size=10)
ConsumerRecord(topic=’mykafka’, partition=0, offset=157, timestamp=1587009114273, timestamp_type=0, key=None, value=b’{“top”:100}’, checksum=1039172749, serialized_key_size=-1, serialized_value_size=11)
生产者:
send{“top” :98}
done
send{“top”:99}
done
send{“top”:100}
done